package flinkstudy.stream.process;

import flinkstudy.bo.EstateInfo;
import flinkstudy.stream.environment.FlinkStreamExecutionEnvironment;
import flinkstudy.stream.source.mock.DynamicUnlimitedData;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import java.util.LongSummaryStatistics;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
 * flink ProcessFunction
 *
 * @author daocr
 * @date 2020/2/7
 * @see org.apache.flink.streaming.api.functions.ProcessFunction
 * @see org.apache.flink.streaming.api.functions.KeyedProcessFunction
 * @see org.apache.flink.streaming.api.functions.co.CoProcessFunction
 * @see org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
 * @see org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
 * @see org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
 * @see org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
 * @see org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction
 */
public class ProcessFunctionInterface {

    private FlinkStreamExecutionEnvironment flinkStreamExecutionEnvironment = null;

    @Before
    public void init() {
        flinkStreamExecutionEnvironment = new FlinkStreamExecutionEnvironment();
    }


    public DataStreamSource<EstateInfo> createSource() {
        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        return env.addSource(new DynamicUnlimitedData());
    }

    /**
     * 对流里面的每条数据进行加工
     *
     * @see org.apache.flink.streaming.api.functions.ProcessFunction
     */
    @Test
    public void processFunctionTest() throws Exception {
        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        DataStreamSource<EstateInfo> source = createSource();

        source.process(new ProcessFunction<EstateInfo, EstateInfo>() {
            @Override
            public void processElement(EstateInfo value, Context ctx, Collector<EstateInfo> out) throws Exception {

                // 这里可以查询维表，生成新的数据，然后放到流中去
                out.collect(value);
            }
        }).print();


        env.execute();
    }

    /**
     * 必须在分组后使用
     *
     * @see org.apache.flink.streaming.api.functions.KeyedProcessFunction
     */
    @Test
    public void KeyedProcessFunctionTest() throws Exception {
        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<EstateInfo> source = createSource();

        SingleOutputStreamOperator<EstateInfo> estateInfoSingleOutputStreamOperator = source.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EstateInfo>() {
            @Override
            public long extractAscendingTimestamp(EstateInfo element) {
                return element.getCreateTime().getTime();
            }
        });

        KeyedStream<EstateInfo, Integer> estateInfoIntegerKeyedStream = estateInfoSingleOutputStreamOperator.keyBy(new KeySelector<EstateInfo, Integer>() {
            @Override
            public Integer getKey(EstateInfo value) throws Exception {
                return value.getCityId();
            }
        });

        SingleOutputStreamOperator<String> price = estateInfoIntegerKeyedStream.process(new KeyedProcessFunction<Integer, EstateInfo, String>() {

            private ListState<EstateInfo> listState;

            private boolean isReg = false;

            @Override
            public void open(Configuration parameters) throws Exception {

                ListStateDescriptor<EstateInfo> price = new ListStateDescriptor<>("price", (PojoTypeInfo<EstateInfo>) TypeInformation.of(EstateInfo.class));

                listState = getRuntimeContext().getListState(price);

            }

            @Override
            public void processElement(EstateInfo value, Context ctx, Collector<String> out) throws Exception {
                listState.add(value);
                if (isReg == false) {
                    //1、注册定时器触发时间
                    ctx.timerService().registerEventTimeTimer(value.getCreateTime().getTime() + 10000);
                    isReg = true;
                }
            }

            /**
             * 定时器 触发时，回调
             * @param timestamp
             * @param ctx
             * @param out
             * @throws Exception
             */
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

                isReg = false;

                Integer currentKey = ctx.getCurrentKey();
                // 获取
                Iterable<EstateInfo> estateInfos = listState.get();


                if (estateInfos != null) {
                    // 统计数据
                    LongSummaryStatistics collect = StreamSupport.stream(estateInfos.spliterator(), false).collect(Collectors.summarizingLong(EstateInfo::getPrice));

                    StringBuffer stringBuffer = new StringBuffer();

                    stringBuffer.append("cityId: " + currentKey);
                    stringBuffer.append("\tcount : " + collect.getCount());
                    stringBuffer.append("\tavg : " + new Double(collect.getAverage()).longValue());
                    stringBuffer.append("\tmin : " + collect.getMin());
                    stringBuffer.append("\tmax : " + collect.getMax());
                    stringBuffer.append("\tsum : " + collect.getSum());

                    out.collect(stringBuffer.toString());

                    // 3、计算完成后，清除状态
                    listState.clear();
                }
            }
        });

        price.print();

        env.execute();
    }


    /**
     * 两条流合并
     *
     * @see org.apache.flink.streaming.api.functions.co.CoProcessFunction
     */
    @Test
    public void CoProcessFunctionTest() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        DataStreamSource<EstateInfo> source1 = createSource();
        DataStreamSource<EstateInfo> source2 = createSource();

        SingleOutputStreamOperator<EstateInfo> process = source1.connect(source2)
                .process(new CoProcessFunction<EstateInfo, EstateInfo, EstateInfo>() {
                    @Override
                    public void processElement1(EstateInfo value, Context ctx, Collector<EstateInfo> out) throws Exception {
                        out.collect(value);
                    }

                    @Override
                    public void processElement2(EstateInfo value, Context ctx, Collector<EstateInfo> out) throws Exception {
                        out.collect(value);
                    }
                });

        process.print();

        env.execute();


    }
}
